-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-33822][SQL] Use the CastSupport.cast method in HashJoin
#30818
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
To add unit tests, I'm looking for a simpler query to reproduce this. But, I've not found it yet. |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, super quick! @maropu :)
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you, @maropu . I manually verified that this PR fixes TPCDS Q5 error.
|
BTW, @maropu . Why do you make another JIRA for this? Also, the new JIRA seems to be an If you don't mind, shall we use the original issue, SPARK-33822 (Bug) instead of SPARK-33823 (Improvement)? |
|
Yeah let's port it back. |
CastSupport.cast method in HashJoinCastSupport.cast method in HashJoin
|
Kubernetes integration test starting |
|
Thanks, I merged two issues by copying all description into one JIRA (SPARK-33822 Bug). |
Yea, it's just my mistake... (I didn't notice that you filed the ticket for this). |
|
Kubernetes integration test status success |
cloud-fan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch!
|
Test build #132937 has finished for PR 30818 at commit
|
|
retest this please |
gengliangwang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the quick fix!
|
Test build #132943 has finished for PR 30818 at commit
|
|
Could you fix the test suite, @maropu ? The following is failing due to the session timezone. |
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice catch!
|
Sure, @dongjoon-hyun |
|
Thank you, @maropu and all. The last commit is only about the two test suites and I verified them locally. |
|
Thanks for the quick response, @dongjoon-hyun ! |
### What changes were proposed in this pull request? This PR intends to fix the bug that throws a unsupported exception when running [the TPCDS q5](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q5.sql) with AQE enabled ([this option is enabled by default now via SPARK-33679](031c5ef)): ``` java.lang.UnsupportedOperationException: BroadcastExchange does not support the execute() code path. at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecute(BroadcastExchangeExec.scala:189) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) at org.apache.spark.sql.execution.exchange.ReusedExchangeExec.doExecute(Exchange.scala:60) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) at org.apache.spark.sql.execution.adaptive.QueryStageExec.doExecute(QueryStageExec.scala:115) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:321) at org.apache.spark.sql.execution.SparkPlan.executeCollectIterator(SparkPlan.scala:397) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:118) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:185) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ... ``` I've checked the AQE code and I found `EnsureRequirements` wrongly puts `BroadcastExchange` on a top of `BroadcastQueryStage` in the `reOptimize` phase as follows: ``` +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#2183] +- BroadcastQueryStage 2 +- ReusedExchange [d_date_sk#1086], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#1963] ``` A root cause is that a `Cast` class in a required child's distribution does not have a `timeZoneId` field (`timeZoneId=None`), and a `Cast` class in `child.outputPartitioning` has it. So, this difference can make the distribution requirement check fail in `EnsureRequirements`: https://github.com/apache/spark/blob/1e85707738a830d33598ca267a6740b3f06b1861/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L47-L50 The `Cast` class that does not have a `timeZoneId` field is generated in the `HashJoin` object. To fix this issue, this PR proposes to use the `CastSupport.cast` method there. ### Why are the changes needed? Bugfix. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually checked that q5 passed. Closes #30818 from maropu/BugfixInAQE. Authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 51ef443) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request? This PR intends to fix the bug that throws a unsupported exception when running [the TPCDS q5](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q5.sql) with AQE enabled ([this option is enabled by default now via SPARK-33679](031c5ef)): ``` java.lang.UnsupportedOperationException: BroadcastExchange does not support the execute() code path. at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecute(BroadcastExchangeExec.scala:189) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) at org.apache.spark.sql.execution.exchange.ReusedExchangeExec.doExecute(Exchange.scala:60) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) at org.apache.spark.sql.execution.adaptive.QueryStageExec.doExecute(QueryStageExec.scala:115) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:321) at org.apache.spark.sql.execution.SparkPlan.executeCollectIterator(SparkPlan.scala:397) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:118) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:185) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ... ``` I've checked the AQE code and I found `EnsureRequirements` wrongly puts `BroadcastExchange` on a top of `BroadcastQueryStage` in the `reOptimize` phase as follows: ``` +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#2183] +- BroadcastQueryStage 2 +- ReusedExchange [d_date_sk#1086], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#1963] ``` A root cause is that a `Cast` class in a required child's distribution does not have a `timeZoneId` field (`timeZoneId=None`), and a `Cast` class in `child.outputPartitioning` has it. So, this difference can make the distribution requirement check fail in `EnsureRequirements`: https://github.com/apache/spark/blob/1e85707738a830d33598ca267a6740b3f06b1861/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L47-L50 The `Cast` class that does not have a `timeZoneId` field is generated in the `HashJoin` object. To fix this issue, this PR proposes to use the `CastSupport.cast` method there. ### Why are the changes needed? Bugfix. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually checked that q5 passed. Closes #30818 from maropu/BugfixInAQE. Authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 51ef443) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
|
Ur, I forgot that |
|
okay, I will do it now. |
|
Thank you! @maropu |
### What changes were proposed in this pull request? This PR intends to fix the bug that throws a unsupported exception when running [the TPCDS q5](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q5.sql) with AQE enabled ([this option is enabled by default now via SPARK-33679](031c5ef)): ``` java.lang.UnsupportedOperationException: BroadcastExchange does not support the execute() code path. at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecute(BroadcastExchangeExec.scala:189) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) at org.apache.spark.sql.execution.exchange.ReusedExchangeExec.doExecute(Exchange.scala:60) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) at org.apache.spark.sql.execution.adaptive.QueryStageExec.doExecute(QueryStageExec.scala:115) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:321) at org.apache.spark.sql.execution.SparkPlan.executeCollectIterator(SparkPlan.scala:397) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:118) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:185) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ... ``` I've checked the AQE code and I found `EnsureRequirements` wrongly puts `BroadcastExchange` on a top of `BroadcastQueryStage` in the `reOptimize` phase as follows: ``` +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#2183] +- BroadcastQueryStage 2 +- ReusedExchange [d_date_sk#1086], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#1963] ``` A root cause is that a `Cast` class in a required child's distribution does not have a `timeZoneId` field (`timeZoneId=None`), and a `Cast` class in `child.outputPartitioning` has it. So, this difference can make the distribution requirement check fail in `EnsureRequirements`: https://github.com/apache/spark/blob/1e85707738a830d33598ca267a6740b3f06b1861/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L47-L50 The `Cast` class that does not have a `timeZoneId` field is generated in the `HashJoin` object. To fix this issue, this PR proposes to use the `CastSupport.cast` method there. This is a backport PR for #30818. ### Why are the changes needed? Bugfix. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually checked that q5 passed with AQE enabled. Closes #30830 from maropu/SPARK-33822-BRANCH3.0. Authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
|
Could you please tell if this has been fixed in 3.2 and the newer versions as well? The ticket lists 3.0.0, 3.0.1, 3.1.0, 3.2.0 as the affected versions and 3.0.2, 3.1.0 as fix versions. I believe we're having this issue in 3.2.1. |
|
It's fixed in 3.1.0 and forward, including 3.2.0. You may hit a different bug, please open a new JIRA ticket, thanks for reporting! |

What changes were proposed in this pull request?
This PR intends to fix the bug that throws a unsupported exception when running the TPCDS q5 with AQE enabled (this option is enabled by default now via SPARK-33679):
I've checked the AQE code and I found
EnsureRequirementswrongly putsBroadcastExchangeon a top ofBroadcastQueryStagein thereOptimizephase as follows:A root cause is that a
Castclass in a required child's distribution does not have atimeZoneIdfield (timeZoneId=None), and aCastclass inchild.outputPartitioninghas it. So, this difference can make the distribution requirement check fail inEnsureRequirements:spark/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
Lines 47 to 50 in 1e85707
The
Castclass that does not have atimeZoneIdfield is generated in theHashJoinobject. To fix this issue, this PR proposes to use theCastSupport.castmethod there.Why are the changes needed?
Bugfix.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Manually checked that q5 passed.